package org.anyline.simple.influx;

import com.influxdb.client.*;
import com.influxdb.client.domain.Bucket;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import org.anyline.data.influxdb.entity.InfluxPoint;
import org.anyline.data.influxdb.entity.InfluxSet;
import org.anyline.data.influxdb.param.InfluxConfigStore;
import org.anyline.data.param.init.DefaultConfigStore;
import org.anyline.entity.DataRow;
import org.anyline.entity.DataSet;
import org.anyline.net.HttpUtil;
import org.anyline.proxy.ServiceProxy;
import org.apache.http.entity.StringEntity;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.UnsupportedEncodingException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@SpringBootTest
public class InfluxTest {
    private static String HOST = "http://localhost:38086";
    private static String ORG = "2dfed8a26c2bc29a";
    private static String TOKEN = "WRgQLBFCGGHt-SFiFbZw10AVs4htH7dsYpBa_vmnvhfJuToi7sFhBSOXLrT_fr5ASS4P8MPWc0JMcCLYhH-TzA==";
    @Test
    public void insert(){
        long now = Instant.now().toEpochMilli(); // 当前时间
        InfluxPoint point = new InfluxPoint("device_test");
        point.bucket("test");
        point.addTag("location", "New York")
            .addTag("locations", "New")
            .addField("f1", "v1")
            .addField("f2", "v2")
            .time(now, WritePrecision.MS); // 使用毫秒精度
        //可以在point上设置插入位置
        point.bucket("test");
        point.org(ORG);
        ServiceProxy.insert(point);


        //如果批量 可以在configs上设置 或 InfluxSet上设置
        List<InfluxPoint> points = new ArrayList<>();
        InfluxSet set = new InfluxSet();
        set.bucket("test");
        set.org(ORG);
        for(int i=0; i<10; i++){
            point = new InfluxPoint("device_test");
            point.bucket("test");
            point.addTag("location", "New York")
                .addTag("locations", "New")
                .addField("f1", "v1")
                .addField("f2", "v2")
                .time(now+i, WritePrecision.MS); // 使用毫秒精度
            points.add(point);
            set.add(point);
        }
        InfluxConfigStore configs = new InfluxConfigStore();
        configs.bucket("test");
        configs.org(ORG);

        ServiceProxy.insert(points, configs);
        ServiceProxy.insert(set);

        //如果都没设置 就用注册数据源时默认的bucket org
    }
    @Test
    public void query_sql(){
        DataSet set = ServiceProxy.querys("SELECT * FROM device_test where f1='v1' limit 3", new InfluxConfigStore().bucket("test"));
        System.out.println(set);
        set = ServiceProxy.querys("device_test", new InfluxConfigStore().bucket("test").and("f1","v1").limit(5, 6), "f2:v2");
        System.out.println(set);
        //表名的前缀 默认识别成bucket
        set = ServiceProxy.querys("test.device_test", 2, 8,"f1:v1", "f2='v2'");
        for(DataRow row:set){
            System.out.println(row);
        }
    }
    @Test
    public void query_flux(){
        String body = "from(bucket: \"test\")\n" +
                "          |> range(start: 2020-03-01T00:00:00Z)\n" +
                "          |> filter(fn: (r) => r._measurement == \"device_test\")";
        //注意这里 不支持offset
        DataSet set = ServiceProxy.querys(body, new DefaultConfigStore().limit(3));
        for(DataRow row:set){
            System.out.println(row);
        }
    }
    @Test
    public void query_config_flux(){
        InfluxConfigStore configs = new InfluxConfigStore();
        configs.bucket("test");
        //configs.and("_measurement", "device_test");
        configs.measurement("device_test");
        configs.start("0");
        DataSet set = ServiceProxy.querys(configs);
        System.out.println(set);
    }
    @Test
    public void buckets(){
        Map<String, String> header = new HashMap<>();
        header.put("Authorization","Token "+TOKEN);
        header.put("Accept", "application/csv");
        String url = HOST + "/api/v2/buckets";
        String result = HttpUtil.get(header, url).getText();

        System.out.println(result);

    }
    @Test
    public void sql(){
        Map<String, String> header = new HashMap<>();
        header.put("Authorization","Token "+TOKEN);
        header.put("Accept", "application/csv");
        String url = HOST + "/query?db=test";
        String sql = "SELECT * FROM device_test where f1='v1'";
        String encode = HttpUtil.encode(sql,false, true);
        url += "&q="+encode;
        String result = HttpUtil.post(header, url).getText();
        System.out.println(result);
    }
    @Test
    public void http_query() throws UnsupportedEncodingException {
        //https://docs.influxdata.com/influxdb/v2/api/#tag/Quick-start
        String body = "from(bucket: \"test\")\n" +
            "          |> range(start: 0)\n" +
            "          |> filter(fn: (r) => r._measurement == \"device_test\") |> limit(n:100)";

        Map<String, String> header = new HashMap<>();
        header.put("Authorization","Token "+TOKEN);
        header.put("Content-Type", "application/vnd.flux");
        header.put("Accept", "application/csv");

        String result = HttpUtil.post(header, HOST +"/api/v2/query?limit=10&offset=5&org="+ORG, new StringEntity(body)).getText();
        System.out.println(result);

    }

    @Test
    public void delete(){
        InfluxConfigStore configs = new InfluxConfigStore();
        configs.range("2020-03-01T00:00:00Z", "2025-03-01T00:00:00Z");
        //configs.measurement("device_test");
        //configs.and("f1", "v1");
        //如果指定则按 数据源默认配置
        configs.org(ORG);
        configs.bucket("test");
        ServiceProxy.delete(configs);
    }
    @Test
    public void http_delete(){
        /*curl --request POST http://localhost:8086/api/v2/delete?org=example-org&bucket=example-bucket \
  --header 'Authorization: Token YOUR_API_TOKEN' \
  --header 'Content-Type: application/json' \
  --data '{
    "start": "2020-03-01T00:00:00Z",
    "stop": "2020-11-14T00:00:00Z",
    "predicate": "_measurement=\"example-measurement\" AND exampleTag=\"exampleTagValue\""

    //2006-01-02T15:04:05.999999999Z07:00.
    {"start":"2020-03-01T00:00:00Z","stop":"2025-03-01T00:00:00Z","predicate":"f1 = 'v1'  AND  _measurement = 'device_test'"}
  }*/
    }


    @Test
    public void test(){
        String body = "from(bucket: \"test\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"device_test\")";
       boolean b = body.toLowerCase().matches(".*\\|>\\s*limit.*\\(.+");
       System.out.println(b);
    }
    @Test
    public void client(){

        InfluxDBClientOptions options = InfluxDBClientOptions.builder()
            .url(HOST)
            .authenticateToken(TOKEN.toCharArray())
            //如果不设置org bucket 在读写数据时需要提供
            //.org("2dfed8a26c2bc29a")
            //.bucket("test")
            .build();
        InfluxDBClient client = InfluxDBClientFactory.create(options);
        OrganizationsApi organizationsApi = client.getOrganizationsApi();
        System.out.println(organizationsApi.findOrganizations());




        List<Bucket> list =  client.getBucketsApi().findBuckets();
        // 查询所有bucket
        for (Bucket bucket : list) {
            System.out.println(bucket.getName());
        }

        Instant now = Instant.now(); // 当前时间
        List<Point> ps = new ArrayList<>();
        Point p1 = Point.measurement("device_test")
            .addTag("location", "New York")
            .addTag("locations", "New")
            .addField("f1", "v1")
            .addField("f2", "v2")
            .time(now.toEpochMilli(), WritePrecision.MS); // 使用毫秒精度

        Point p2 = Point.measurement("device_test")
            .addTag("location", "New York")
            .addTag("locations", "New")
            .addField("f1", "v1")
            .addField("f2", "v2")
            .time(now.toEpochMilli(), WritePrecision.MS); // 使用毫秒精度

        Point p3 = Point.measurement("device_test")
            .addTag("location", "New York")
            .addTag("locations", "New")
            .addField("f1", "v1")
            .addField("f2", "v2")
            .time(now.toEpochMilli()+1, WritePrecision.MS); // 使用毫秒精度
        ps.add(p1);

        WriteApiBlocking api = client.getWriteApiBlocking();
        //插入一行
        System.out.println(p1.toLineProtocol());
        api.writeRecord("test", ORG,WritePrecision.MS, p1.toLineProtocol());
        //插入多行
        api.writePoints("test", ORG, ps);
        api.writePoint("test", ORG, p1);


        QueryApi query = client.getQueryApi();

        String flux = "from(bucket:\"test\") |> range(start: 0)";
        // Query data
        List<FluxTable> tables = query.query(flux, ORG);
        int idx = 0;
        for (FluxTable table : tables) {
            System.out.println("\ntable:"+table);
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                //System.out.println("GROUP:"+fluxTable.getGroupKey()+":\n TIME:"+record.getTime() + "\n VALUE:" + record.getValueByKey("_value"));
                System.out.println("\n"+idx+++" field:"+record.getField());
                record.getValues().forEach((k,v)->{
                    System.out.println(k +"="+v+"("+v.getClass()+")");
                });
            }
        }
    }

}
